agentmux_srv\backend\wshutil/
rpcio.rs

1#![allow(dead_code)]
2// Copyright 2025-2026, AgentMux Corp.
3// SPDX-License-Identifier: Apache-2.0
4
5//! RPC I/O adapters for different transport modes.
6//! Port of Go's `pkg/wshutil/wshrpcio.go`.
7//!
8//! Provides adapters to convert between:
9//! - Stream (JSON lines) ↔ message channels
10//! - PTY (OSC-wrapped) ↔ message channels
11//! - WebSocket (JSON packets) ↔ message channels
12
13
14use std::io::{BufRead, BufReader, Read, Write};
15use tokio::sync::mpsc;
16use super::osc::encode_wave_osc_bytes;
17
18/// Read JSON lines from a stream and send them to a channel.
19///
20/// Each line is sent as a separate message. Blocks until the reader is exhausted.
21pub fn adapt_stream_to_msg_ch(
22    input: impl Read + Send + 'static,
23    output: mpsc::Sender<Vec<u8>>,
24) -> std::thread::JoinHandle<Result<(), String>> {
25    std::thread::spawn(move || {
26        let reader = BufReader::new(input);
27        for line in reader.lines() {
28            let line = line.map_err(|e| format!("read error: {}", e))?;
29            let trimmed = line.trim().to_string();
30            if trimmed.is_empty() {
31                continue;
32            }
33            output
34                .blocking_send(trimmed.into_bytes())
35                .map_err(|e| format!("channel send error: {}", e))?;
36        }
37        Ok(())
38    })
39}
40
41/// Read messages from a channel and write them as JSON lines to a stream.
42///
43/// Each message is followed by a newline. Blocks until the channel is closed.
44pub async fn adapt_output_ch_to_stream(
45    mut output_ch: mpsc::Receiver<Vec<u8>>,
46    mut output: impl Write,
47) -> Result<(), String> {
48    while let Some(msg) = output_ch.recv().await {
49        output
50            .write_all(&msg)
51            .map_err(|e| format!("write error: {}", e))?;
52        output
53            .write_all(b"\n")
54            .map_err(|e| format!("newline write error: {}", e))?;
55        output.flush().map_err(|e| format!("flush error: {}", e))?;
56    }
57    Ok(())
58}
59
60/// Read messages from a channel and write them as OSC-escaped sequences to a PTY.
61///
62/// Each message is wrapped in the appropriate OSC escape sequence.
63pub async fn adapt_msg_ch_to_pty(
64    mut output_ch: mpsc::Receiver<Vec<u8>>,
65    osc_esc: &str,
66    mut output: impl Write,
67) -> Result<(), String> {
68    if osc_esc.len() != 5 {
69        return Err("osc_esc must be 5 characters".to_string());
70    }
71    while let Some(msg) = output_ch.recv().await {
72        let encoded =
73            encode_wave_osc_bytes(osc_esc, &msg)?;
74        output
75            .write_all(&encoded)
76            .map_err(|e| format!("write error: {}", e))?;
77        output.flush().map_err(|e| format!("flush error: {}", e))?;
78    }
79    Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use std::io::Cursor;
86
87    #[tokio::test]
88    async fn test_adapt_stream_to_msg_ch() {
89        let data = b"{\"cmd\":\"test\"}\n{\"cmd\":\"hello\"}\n";
90        let (tx, mut rx) = mpsc::channel(10);
91
92        let handle = adapt_stream_to_msg_ch(Cursor::new(data.to_vec()), tx);
93
94        let msg1 = rx.recv().await.unwrap();
95        assert_eq!(String::from_utf8(msg1).unwrap(), "{\"cmd\":\"test\"}");
96
97        let msg2 = rx.recv().await.unwrap();
98        assert_eq!(String::from_utf8(msg2).unwrap(), "{\"cmd\":\"hello\"}");
99
100        handle.join().unwrap().unwrap();
101    }
102
103    #[tokio::test]
104    async fn test_adapt_output_ch_to_stream() {
105        let (tx, rx) = mpsc::channel(10);
106        let mut output = Vec::new();
107
108        tx.send(b"{\"result\":\"ok\"}".to_vec()).await.unwrap();
109        drop(tx); // close channel
110
111        adapt_output_ch_to_stream(rx, &mut output).await.unwrap();
112        assert_eq!(String::from_utf8(output).unwrap(), "{\"result\":\"ok\"}\n");
113    }
114}